package 函数式编程.使用Stream;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

public class parallelStream方法 {
	public static void main(String[] args) {
      List<String> str = new ArrayList<>();
      str.add("1");
      str.add("2");
      str.add("3");
      str.add("4");
      str.add("5");
      str.add("6");
      
      /*单个线程执行任务*/
      str.stream().filter(e -> {
          System.out.println(Thread.currentThread().getName() + "\t过滤" + e);
          return Integer.parseInt(e) % 2 == 0 ? true : false;
      }).collect(Collectors.toList());
      
      str.parallelStream().filter(e -> {
          System.out.println(Thread.currentThread().getName() + "\t过滤" + e);
        return Integer.parseInt(e) % 2 == 0 ? true : false;
      }).collect(Collectors.toList());

	}
}

/*
parallelStream浅析

stream流式计算是Java8中新添的一种新特性，可以把stream流理解为串行的流式计算，而parallelStream是一种多线程并行流式计算
JAVA8中引入了lamda表达式和Stream接口。其丰富的API及强大的表达能力极大的简化代码，提升了效率，同时还通过parallelStream提供并发操作的支持，本文
探讨parallelStream方法的使用。
在stream()中任务的调度执行是串行化的，需要执行完了其中一个任务执行下一个，而parallelStream是利用多线程进行的，这可以很大程度简化我们使用并发操作。


首先看下java doc中对parallelStream的定义。

    A sequence of elements supporting sequential and parallel aggregate operations.
 ...
 Stream pipelines may execute either sequentially or in parallel. This execution mode is a property of the stream. 
 Streams are created with an initial choice of sequential or parallel execution. (For example, Collection.stream() 
creates a sequential stream, and Collection.parallelStream() creates a parallel one.) This choice of execution mode 
 may be modified by the BaseStream.sequential() or BaseStream.parallel() methods, and may be queried with the 
 BaseStream.isParallel() method.

既然可以并行的执行，废话不多说，先看一个例子。
class Person {
        int    id;
        String name;
        String sex;
        float  height;

        public Person(int id, String name, String sex, float height) {
            this.id = id;
            this.name = name;
            this.sex = sex;
            this.height = height;
        }
}

    //构造数据
    public List<Person> constructPersons() {

        List<Person> persons = new ArrayList<Person>();
        for (int i = 0; i < 5; i++) {
            Person p = new Person(i, "name" + i, "sex" + i, i);
            persons.add(p);
        }
        return persons;
    }

    public void doFor(List<Person> persons) {
        long start = System.currentTimeMillis();

        for (Person p : persons) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
            }
            System.out.println(p.name);
        }

        long end = System.currentTimeMillis();
        System.out.println("doFor cost:" + (end - start));
    }

     //顺序流
    public void doStream(List<Person> persons) {
        long start = System.currentTimeMillis();

        persons.stream().forEach(x -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
            }
            System.out.println(x.name);
        });

        long end = System.currentTimeMillis();
        System.out.println("doStream cost:" + (end - start));
    }

     //并行流
    public void doParallelStream(List<Person> persons) {

        long start = System.currentTimeMillis();

        persons.parallelStream().forEach(x -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {}
            System.out.println(x.name);
        });

        long end = System.currentTimeMillis();

        System.out.println("doParallelStream cost:" + (end - start));
    }

执行结果：

name0
name1
name2
name3
name4
doFor cost:5021
name0
name1
name2
name3
name4
doStream cost:5076
name4
name0
name2
name3
name1
doParallelStream cost:1010

代码上 stream 和 parallelStream 语法差异较小，从执行结果来看，stream顺序输出，而parallelStream 无序输出；parallelStream 执行耗
时是 stream 的五分之一。可以看到在当前测试场景下，parallelStream 获得的相对较好的执行性能，那parallelStream背后到底是什么呢？
要深入了解parallelStream，首先要弄明白ForkJoin框架和ForkJoinPool。ForkJoin框架是java7中提供的并行执行框架，他的策略是分而治之。说
白了，就是把一个大的任务切分成很多小的子任务，子任务执行完毕后，再把结果合并起来。

顺便说下ForkJoin框架和ThreadPoolExecutor的区别，ForkJoin框架可以使用数量有限的线程数，执行大量任务，并且这些任务之间是有父子依赖的，必
须是子任务执行完成后，父任务才能执行。ThreadPoolExecutor 显然是无法支持这种场景的。而ForkJoin框架，可以让其中的线程创建新的任务，并挂起
当前的任务，任务以及子任务会保留在一个内部队列中，此时线程就能够从队列中选择任务顺序执行。

Java 8为ForkJoinPool添加了一个通用线程池，这个线程池用来处理那些没有被显式提交到任何线程池的任务。它是ForkJoinPool类型上的一个静态元素，
它拥有的默认线程数量等于运行计算机上的处理器数量。当调用Arrays类上添加的新方法时，自动并行化就会发生。比如用来排序一个数组的并行快速排序，用来
对一个数组中的元素进行并行遍历。自动并行化也被运用在Java 8新添加的Stream API中。

上面的代码中，forEach方法会为每个元素的操作创建一个任务，该任务会被前文中提到的ForkJoinPool中的通用线程池处理。以上的并行计算逻辑当然也可以使
用ThreadPoolExecutor完成，但是就代码的可读性和代码量而言，使用ForkJoinPool明显更胜一筹。

默认线程池的数量就是处理器的数量，特殊场景下可以使用系统属性：-Djava.util.concurrent.ForkJoinPool.common.parallelism={N} 调整。

对上面例子做下调整，sleep时间变为2ms，

Thread.sleep(2);

执行结果如下：

doFor cost:12
=======================
doParallelStream cost:62
=======================
doStream cost:13

doParallelStream耗时最多，可见并不是并行执行就是性能最好的，要根据具体的应用场景测试分析。这个例子中，每个子任务执行时间较短，而线程切换消耗了大量时间。


说到了并发，不得不提线程安全。先看一个例子：

public void doThreadUnSafe() {
        List<Integer> listFor = new ArrayList<>();
        List<Integer> listParallel = new ArrayList<>();

        IntStream.range(0, 1000).forEach(listFor::add);
        IntStream.range(0, 1000).parallel().forEach(listParallel::add);

        System.out.println("listFor size :" + listFor.size());
        System.out.println("listParallel size :" + listParallel.size());
    }

输出结果：

listFor size :1000
listParallel size :949
显而易见，stream.parallel.forEach()中执行的操作并非线程安全。如果需要线程安全，可以把集合转换为同步集合，
即：Collections.synchronizedList(new ArrayList<>())。

总结下来如下：
1.使用parallelStream可以简洁高效的写出并发代码。
2.parallelStream并行执行是无序的。
3.parallelStream提供了更简单的并发执行的实现，但并不意味着更高的性能，它是使用要根据具体的应用场景。如果cpu资源紧张parallelStream不会带来性能提升；
	如果存在频繁的线程切换反而会降低性能。
4.任务之间最好是状态无关的，因为parallelStream默认是非线程安全的，可能带来结果的不确定性。*/